Kinesis Producer Library(KPL)とfluentdとLambdaを連携してKinesisのスループットを上げる
2016/06/03(金)のAWS Summit Tokyo 2016でクックパッド株式会社の星さんが「秒間数万のログをいい感じにするアーキテクチャ」という発表をされました。
クックパッドのログ収集基盤が Fluentd や Amazon Kinesis Streams や Redshift を活用していい感じするアーキテクチャが参考になった方は非常に多いと思います(あの規模のサービスに関わるかはともかく)。
発表の中でKinesis Producer Library(以下 KPL)を使ってログ数多すぎ問題を解決したことが語られていました。
個人的には、Kinesis Streams のシャード数を増やせばどうにかなるレベルのサービスしか担当したことがないため、KPL の検討は見送ってきましたが、発表にあった KPL のログ集約によるスループットの向上が衝撃的だったため、最小構成の KPL を実際に動かしてみました。
なお、今回紹介する構成を
- Amazon Kinesis Data Streams
- KPL
- fluentd
- Amazon Linux(1 -> 2)
- Lambda(Python 2.7 ->Python 3.6)
に更新したバージョンは、次のブログを参照下さい。
Kinesis Producer Library(KPL)とfluentdとLambda(Python3)を連携させる(Amazon Linux2版)
KPL について
KPL は 効率的に Kinesis Streams に書きこむライブラリで、以下の様な特徴があります
- 自動的で設定可能な再試行メカニズムにより 1 つ以上の Amazon Kinesis stream へ書き込む
- レコードを収集し、PutRecords を使用して、リクエストごとに複数シャードへ複数レコードを書き込む
- ユーザーレコードを集約し、ペイロードサイズを増加させ、スループットを改善する
- コンシューマーで Amazon Kinesis Client Library(KCL)とシームレスに統合して、バッチ処理されたレコードを集約解除する
- Amazon CloudWatch メトリックスをユーザーに代わって送信し、プロデューサーのパフォーマンスを確認可能にする
KPL の詳細は次のドキュメントを参照ください。
- http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html
- https://blogs.aws.amazon.com/bigdata/post/Tx3ET30EGDKUUI2/Implementing-Efficient-and-Reliable-Producers-with-the-Amazon-Kinesis-Producer-L
KPL の動作確認内容
1. 最小構成の KPL ログ集約型 Kinesis メッセージングシステムを構築
- ストリームには Amazon Kinesis Streams を利用
- Producer には aws-fluent-plugin-kinesis を使い、 KPL で aggregate させる
- Consumer には Lambda(Python 2.7) を使い awslabs/kinesis-aggregation でログを deaggregate させる
2. ログ集約によるレコード数の削減を確認
- ストリームには Amazon Kinesis Streams を利用
- Producer には aws-fluent-plugin-kinesis を利用
- 非 aggregate 方式と KPL による aggregate 方式で、書き込み時のレコード数の違いを確認
検証環境
- OS : Amazon Linux AMI release 2016.03
- fluentd : td-agent 2.3.1-0.el2015
- fluent-plugin-kinesis : 1.0.1
では、順に、動作確認します。
1. 最小構成を構築
最終形
EC2 に IAM Role の設定
サーバーにAWS認証情報を配布して、リクエスト時に認証情報を利用する代わりに、EC2インスタンスに IAM Role を設定してリソース操作を許可します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "cloudwatch:PutMetricData" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "kinesis:*" ], "Resource": "arn:aws:kinesis:ap-northeast-1:123456789012:stream/*” } ] }
Kinesis ストリームの操作を許可するほか、KPL はカスタムメトリクスも利用するため"cloudwatch:PutMetricData”
も許可してください。
許可し忘れていると、fluentd が以下の様なcloudwatch:PutMetricData
の実行エラーログを吐きます。
<ErrorResponse xmlns="http://monitoring.amazonaws.com/doc/2010-08-01/"> <Error> <Type>Sender</Type> <Code>AccessDenied</Code> <Message>User: arn:aws:sts::123456789012:assumed-role/KinesisFluentd/i-7a6a0ae5 is not authorized to perform: cloudwatch:PutMetricData</Message> </Error> <RequestId>dd56c5ed-299b-11e6-b9b2-299f16fe361d</RequestId> </ErrorResponse>
ポリシーの詳細は次のドキュメントを参照ください。
http://docs.aws.amazon.com/kinesis/latest/dev/controlling-access.html
Kinesis Stream の設定
Kinesis Stream をコマンドラインから設定します。 今回はストリーム名は「test」とします。
$ STREAM_NAME=test $ aws kinesis create-stream --stream-name $STREAM_NAME --shard-count 2 $ aws kinesis wait stream-exists --stream-name $STREAM_NAME # wait until `StreamStatus` becomes `ACTIVE`
Apache のインストール
今回は Apache のログを Kinesis Stream に送信します。 Apache をインストールします。
$ sudo yum install -y httpd $ sudo service httpd start Starting httpd: [ OK ]
HTTP アクセスして、Apache が起動していることを確認しましょう。
$ curl localhost/
fluentd の設定
最後に fluentd の設定を行います。
fluentd のインストール
fluentd の安定版である td-agent をインストールします。
$ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh ... Installed: td-agent.x86_64 0:2.3.1-0.el2015 Complete!
fluentd の Kinesis Stream プラグインのインストール
Kinesis Stream プラグインとして fluent-plugin-kinesis をインストールします。
$ sudo td-agent-gem install fluent-plugin-kinesis Fetching: concurrent-ruby-1.0.2.gem (100%) Successfully installed concurrent-ruby-1.0.2 Fetching: os-0.9.6.gem (100%) Successfully installed os-0.9.6 Fetching: middleware-0.1.0.gem (100%) Successfully installed middleware-0.1.0 Fetching: protobuf-3.6.9.gem (100%) Successfully installed protobuf-3.6.9 Fetching: fluent-plugin-kinesis-1.0.1.gem (100%) Successfully installed fluent-plugin-kinesis-1.0.1 Parsing documentation for concurrent-ruby-1.0.2 Installing ri documentation for concurrent-ruby-1.0.2 Parsing documentation for os-0.9.6 Installing ri documentation for os-0.9.6 Parsing documentation for middleware-0.1.0 Installing ri documentation for middleware-0.1.0 Parsing documentation for protobuf-3.6.9 Installing ri documentation for protobuf-3.6.9 Parsing documentation for fluent-plugin-kinesis-1.0.1 Installing ri documentation for fluent-plugin-kinesis-1.0.1 Done installing documentation for concurrent-ruby, os, middleware, protobuf, fluent-plugin-kinesis after 7 seconds 5 gems installed
KPL 向けに protobuf など、いろいろな gem がインストールされています。
fluentd設定ファイルの修正
/etc/td-agent/td-agent.conf
にある設定ファイルを修正します。
アクセスログファイル/var/log/httpd/access_log
を tail
し、書き込まれたデータを Kinesis Stream に送信します。
データソースの定義が <source>
タグ、データ処理の定義が<match>
タグです。
<source> @type tail format apache2 path /var/log/httpd/access_log pos_file /var/log/td-agent/httpd-access.pos tag log.httpd.access </source> <match log.httpd.*> @type kinesis_producer stream_name test region ap-northeast-1 </match>
fluentd を KPL の Producer として利用するには match ディレクティブの type を kinesis_producer
にします。
詳細は次のドキュメントを参照ください。
https://github.com/awslabs/aws-fluent-plugin-kinesis#configuration-kinesis_producer
設定ファイルのシンタックスチェック
設定ファイルのシンタックスチェックをします。
$ td-agent --dry-run -c /etc/td-agent/td-agent.conf 2016-06-04 17:21:45 +0000 [info]: reading config file path="kpl.conf" 2016-06-04 17:21:45 +0000 [info]: starting fluentd-0.12.20 as dry run mode 2016-06-04 17:21:45 +0000 [info]: gem 'fluent-mixin-config-placeholders' version '0.3.1' 2016-06-04 17:21:45 +0000 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6' 2016-06-04 17:21:45 +0000 [info]: gem 'fluent-plugin-kinesis' version '1.0.1' 2016-06-04 17:21:45 +0000 [info]: gem 'fluent-plugin-mongo' version '0.7.12' 2016-06-04 17:21:45 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.4' 2016-06-04 17:21:45 +0000 [info]: gem 'fluent-plugin-s3' version '0.6.5' 2016-06-04 17:21:45 +0000 [info]: gem 'fluent-plugin-scribe' version '0.10.14' 2016-06-04 17:21:45 +0000 [info]: gem 'fluent-plugin-td' version '0.10.28' 2016-06-04 17:21:45 +0000 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2' 2016-06-04 17:21:45 +0000 [info]: gem 'fluent-plugin-webhdfs' version '0.4.1' 2016-06-04 17:21:45 +0000 [info]: gem 'fluentd' version '0.12.20' 2016-06-04 17:21:45 +0000 [info]: adding match pattern="log.httpd.*" type="kinesis_producer" 2016-06-04 17:21:45 +0000 [info]: adding source type="tail" 2016-06-04 17:21:45 +0000 [info]: using configuration file: <ROOT> <source> @type tail format apache2 path /var/log/httpd/access_log pos_file /var/log/td-agent/httpd-access.pos tag log.httpd.access </source> <match log.httpd.*> @type kinesis_producer stream_name kpl region ap-northeast-1 debug true log_level info <kinesis_producer> </kinesis_producer> </match> </ROOT> $ echo $? 0
エラーメッセージが表示されず、ステータスコードも正常の「0」です。
fluentd の起動
fluentd をフォアグラウンド実行してみましょう。
$ sudo td-agent -c /etc/td-agent/td-agent.conf 2016-06-04 17:36:30 +0000 [info]: reading config file path="/etc/td-agent/td-agent.conf" 2016-06-04 17:36:30 +0000 [info]: starting fluentd-0.12.20 2016-06-04 17:36:30 +0000 [info]: gem 'fluent-mixin-config-placeholders' version '0.3.1' 2016-06-04 17:36:30 +0000 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6' 2016-06-04 17:36:30 +0000 [info]: gem 'fluent-plugin-kinesis' version '1.0.1' 2016-06-04 17:36:30 +0000 [info]: gem 'fluent-plugin-mongo' version '0.7.12' 2016-06-04 17:36:30 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.4' 2016-06-04 17:36:30 +0000 [info]: gem 'fluent-plugin-s3' version '0.6.5' 2016-06-04 17:36:30 +0000 [info]: gem 'fluent-plugin-scribe' version '0.10.14' 2016-06-04 17:36:30 +0000 [info]: gem 'fluent-plugin-td' version '0.10.28' 2016-06-04 17:36:30 +0000 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2' 2016-06-04 17:36:30 +0000 [info]: gem 'fluent-plugin-webhdfs' version '0.4.1' 2016-06-04 17:36:30 +0000 [info]: gem 'fluentd' version '0.12.20' 2016-06-04 17:36:30 +0000 [info]: adding match pattern="log.httpd.*" type="kinesis_producer" 2016-06-04 17:36:30 +0000 [info]: adding source type="tail" 2016-06-04 17:36:30 +0000 [info]: using configuration file: <ROOT> <source> @type tail format apache2 path /var/log/httpd/access_log pos_file /var/log/td-agent/httpd-access.pos tag log.httpd.access </source> <match log.httpd.*> @type kinesis_producer stream_name kpl region ap-northeast-1 debug true log_level info <kinesis_producer> </kinesis_producer> </match> </ROOT> 2016-06-04 17:36:30 +0000 [info]: following tail of /var/log/httpd/access_log [2016-06-04 17:36:31.797746] [0x00007fbc3f4d1740] [info] [metrics_manager.h:148] Uploading metrics to monitoring.ap-northeast-1.amazonaws.com:443
Apache にリクエストをして、Kinesis Stream に送信するデータを生成します。
$ curl localhost
しばらく動かしてみて、問題なさそうなら、 fluentd をバックグラウンド実行しましょう。
$ sudo service td-agent start Starting td-agent: [ OK ]
pstree
の実行結果から td-agent の子プロセスとして KPL プロセス(kinesis_produce)が起動しているのがわかります。
$ pstree -c 7561 sudo───td-agent─┬─td-agent─┬─kinesis_produce─┬─{kinesis_produce} │ │ ├─{kinesis_produce} │ │ ├─{kinesis_produce} │ │ ├─{kinesis_produce} │ │ ├─{kinesis_produce} │ │ └─{kinesis_produce} │ ├─{td-agent} │ ├─{td-agent} │ ├─{td-agent} │ ├─{td-agent} │ ├─{td-agent} │ ├─{td-agent} │ ├─{td-agent} │ ├─{td-agent} │ └─{td-agent} └─{td-agent}
Lambda 関数の実装
KPL は複数のログを1レコードに集約(aggregate)してKinesisにPutRecord します。 そのため、Consumer 側は集約されたログを分割(deaggregate)します。
awslabs/kinesis-aggregation で Python Lambda を実装
AWS が GitHub に公開している awslabs/kinesis-aggregation という Lambda 向け KPL の aggregate/deaggregate ライブラリを利用します。
- Python
- Java
- Node.js
と各 Lambda ランタイム向けライブラリが含まれていますが、今回は Python のものを利用します。
$ git clone https://github.com/awslabs/kinesis-aggregation.git $ cd kinesis-aggregation/python/
移動したディレクトリは、KPL で送信された集約ログを分解し、print するだけの Lambda 関数(lambda_function.py
) がブループリントとして用意されているので、今回はこのファイルを利用しましょう。
lambda_function.py の中身を抜粋
aws_kinesis_agg モジュールには集約されたログを分割する Lambda 向け(悪く言うと、Lambda イベントソース決め打ち)の関数が用意されています。あとは、その関数を呼び出して、各ログを処理するだけです。
- ジェネレーターベース : iter_deaggregate_records
- 非ジェネレーターベース : deaggregate_records
の2種類があります。
実際の Lambda 関数を一部抜粋します。
from aws_kinesis_agg.deaggregator import deaggregate_records, iter_deaggregate_records import base64 def lambda_bulk_handler(event, context): '''A Python AWS Lambda function to process Kinesis aggregated records in a bulk fashion.''' raw_kinesis_records = event['Records'] #Deaggregate all records in one call user_records = deaggregate_records(raw_kinesis_records) #Iterate through deaggregated records for record in user_records: # Kinesis data in Python Lambdas is base64 encoded payload = base64.b64decode(record['kinesis']['data']) print('%s' % (payload)) return 'Successfully processed {} records.'.format(len(user_records))
Lambda パッケージ(Zip)化
lambda_function.py
を元に Lambda 関数として Zip ファイルにまとめるにはどうすればよいでしょうか?
直接利用している aws_kinesis_agg モジュールの他に、シリアライズに利用している Google Protobuf モジュールなども必要です。
Protobuf は Lamba パッケージ化するのが面倒なため、以下のコマンドを叩くと、python_lambda_build.zip
というZipファイル化してくれます。
$ python make_lambda_build.py
lambda_function.py
をカスタマイズして、依存ライブラリを追加した時は、make_lambda_build.py
の上のほうにある定数 PIP_DEPENDENCIES
に依存ライブラリを追加してください。
AWSマネージメントコンソールからLambda 関数を登録
作成したZipファイルを管理画面からアップロードして Lambda 関数を以下の内容で作成します。
- Runtime : Python 2.7
- Handler : lambda_function.lambda_generator_handler
- Event sources : “test” Kinesis Streams
CLoudWatch Logs から Lambda の実行ログを確認
Lambda 関数内では、分割した各ログデータを print 出力しています。 CloudWatch > Log Groups > Streams for /aws/lambda/YOUR_LAMBDA_FUNCTION_NAME のログストリームに、deaggregate されたログが出力されていることを確認します。
CloudWatch Custom Metrics の確認
KPL CloudWatch Custom Metrics を出力します。
集約前のログ(User Records)数やRecordPuts 時のエラー状況など、標準の CloudWatch では取得されないメトリクス情報が記録されます。
詳細は次のドキュメントを参照ください。
http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kpl.html
2. ログ集約によるレコード数の削減を確認
次に、非 aggregate 方式と KPL による aggregate 方式で、書き込み時のレコード数の違いを確認してみましょう。
Kinesis Streams の作成
- kinesis_streams(非 aggregate 用)
- kinesis_producer(KPL aggregate 用)
という2本の Kinesis ストリームを作成します。
fluentd の設定
同じログを上記それぞれの Kinesis Streams に集約方式を変えて送信するように以下の設定を行います。
match ディレクトリを @type copy
で複製し、片方は @type kinesis_streams
、もう片方は @type kinesis_producer
にしています。
<source> @type tail format apache2 path /var/log/httpd/access_log pos_file /var/log/td-agent/httpd-access.pos tag log.httpd.access </source> <match log.httpd.*> @type copy <store> @type kinesis_streams stream_name kinesis_streams region ap-northeast-1 # random_partition_key true #use_yajl true debug true log_level info </store> <store> @type kinesis_producer stream_name kinesis_producer region ap-northeast-1 # random_partition_key true #use_yajl true debug true log_level info </store> </match>
Apache にリクエストを繰り返す
以下の様なシェルスクリプトを用意し、Apache にランダムインターバルで HTTP リクエストさせます。
#!/bin/bash while : do curl --silent localhost -o /dev/null sleep `echo "scale=1; $RANDOM % 100 / 10" | bc` done
Kinesis Streams に投入されたレコード数を比較
HTTP リクエストするシェルスクリプトをしばらく動かし、CloudWatch Metrics の Kinesis -> PutRecords.Records の Average をとったのが以下です。
なかなか衝撃的な結果です。 オレンジの KPL 集約版(kinesis_producer)は1に対して、ブルーの非集約版(kinesis_streams)は 250 前後をさまよっています。
これはどういうことかというと、ログを KPL で集約することで、Kinesis Streams に送信するレコード数を激減できることを意味します。
Kinesis Streams の書き込み処理には
- シャードあたり 1000 records/second
- 500 records/transaction(PutRecords の場合)
というようなハードリミットが存在し、リミットに引っかからないように、シャード数を増やして対応したかたは多いかと思います。
御覧頂いたように、KPL を使うことで、シャード数の削減やスループットの向上を期待できそうです。
まとめ
fluentd/KPL -> Kinesis Streams -> Lambda の連携方法とログ集約による Kinesis Streams のスループット向上を見てきました。
- Kinesis Producer のコンピューターリソース
- ログデータの特性
- Kinesis Consumer の制限
などにより、KPL をすんなりと導入できるとは限らないと思いますが、Kinesis のスループットにお困りの場合は、一度検討してみてはいかがでしょうか。
参考
- https://speakerdeck.com/kanny/miao-jian-shu-mo-falseroguwoiigan-zinisuruakitekutiya
- http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html
- https://developers.google.com/protocol-buffers/
- https://github.com/awslabs/amazon-kinesis-producer
- https://github.com/awslabs/aws-fluent-plugin-kinesis
- https://github.com/awslabs/kinesis-aggregation
- https://blogs.aws.amazon.com/bigdata/post/Tx2JKA8KSZSJ5FF/Process-Amazon-Kinesis-Aggregated-Data-with-AWS-Lambda
- https://blogs.aws.amazon.com/bigdata/post/Tx3ET30EGDKUUI2/Implementing-Efficient-and-Reliable-Producers-with-the-Amazon-Kinesis-Producer-L